{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "**_pySpark Basics: Missing Data_**"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "_by Jeff Levy (jlevy@urban.org)_\n",
    "\n",
    "_Last Updated: 31 Jul 2017, Spark v2.1_"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "_Abstract: In this guide we'll look at how to handle null and missing values in pySpark, with a brief discussion of imputation_\n",
    "\n",
    "_Main operations used: where, isNull, dropna, fillna_"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "***"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We'll load some real data from CSV to work with.  It helps to know in advance how the dataset handles missing values - are they an empty string, or something else?  Most CSVs will use empty strings, but we can't compute anything on a column that is mixed strings and numbers.  The `null` object in pySpark is what we want, and we can tell it when we import the data to replace the value our data uses to denote missing data with it."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "df = spark.read.csv('s3://ui-spark-social-science-public/data/Performance_2015Q1.txt', \n",
    "                    header=False, inferSchema=True, sep='|', nullValue='')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Note that on the `nullValue=''` line, the empty string can be replaced by whatever your dataset uses - this is telling Spark which values in the dataframe it should convert to `null`."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Exploring Null Values\n",
    "\n",
    "First let's see how many rows the entire dataframe has:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "3526154"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.count()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "To explore missing data in pySpark, we need to make sure we're looking in a numerical column - **the system does not insert `null` values into a column that has a string datatype.**  The general point of `null` is so the system knows to skip those rows when doing calculations down a column.  \n",
    "\n",
    "For example, the mean of the series [3, 4, 2, null, 5] is: \n",
    "\n",
    "14 / 4 = 3.5 \n",
    "\n",
    "not: \n",
    "\n",
    "14 / 5 = 2.8\n",
    "\n",
    "In other words, with proper `null` handling, [3, 4, 2, null, 5] is not the same as [3, 4, 2, 0, 5].  This distinction is not relevant in a column of strings."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('_c0', 'bigint'),\n",
       " ('_c1', 'string'),\n",
       " ('_c2', 'string'),\n",
       " ('_c3', 'double'),\n",
       " ('_c4', 'double'),\n",
       " ('_c5', 'int'),\n",
       " ('_c6', 'int'),\n",
       " ('_c7', 'int'),\n",
       " ('_c8', 'string'),\n",
       " ('_c9', 'int'),\n",
       " ('_c10', 'string'),\n",
       " ('_c11', 'string'),\n",
       " ('_c12', 'int'),\n",
       " ('_c13', 'string'),\n",
       " ('_c14', 'string'),\n",
       " ('_c15', 'string'),\n",
       " ('_c16', 'string'),\n",
       " ('_c17', 'string'),\n",
       " ('_c18', 'string'),\n",
       " ('_c19', 'string'),\n",
       " ('_c20', 'string'),\n",
       " ('_c21', 'string'),\n",
       " ('_c22', 'string'),\n",
       " ('_c23', 'string'),\n",
       " ('_c24', 'string'),\n",
       " ('_c25', 'string'),\n",
       " ('_c26', 'int'),\n",
       " ('_c27', 'string')]"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.dtypes"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "For our practice purposes it doesn't matter what this data actually is, so we'll arbitrarily select a numerical column:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "3510294"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.where( df['_c12'].isNull() ).count()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This command takes the form `df.where(___).count()` where the blank is replaced with the desired condition - or in a sentance, *\"Count the dataframe where `___` is True\".*  In the code I used some extra spaces in between the brackets just to make this stand out - Python ignores extra horizonal space nested in commands like that.  So here we're counting how many rows have `null` values in column `_c12`.\n",
    "\n",
    "Note that if we left the `count()` method off the end then **it would return an actual dataframe of all rows where column `C12` is `null`.**  So if you wanted more than just the count you could explore that subset.\n",
    "\n",
    "When we compare the null count to our earlier command, `df.count()`, we can see that column `C12` is mostly null values - there are 15,860 actual values in here, out of 3,526,154 rows.  A common need when exploring a dataset might be to check _all_ our numeric rows for null values.  However, the `isNull()` method can only be called on a column, not an entire dataframe, so I'll write a convenient Python function to do this for us with some comments to explain each step:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "def count_nulls(df):\n",
    "    null_counts = []          #make an empty list to hold our results\n",
    "    for col in df.dtypes:     #iterate through the column data types we saw above, e.g. ('C0', 'bigint')\n",
    "        cname = col[0]        #splits out the column name, e.g. 'C0'    \n",
    "        ctype = col[1]        #splits out the column type, e.g. 'bigint'\n",
    "        if ctype != 'string': #skip processing string columns for efficiency (can't have nulls)\n",
    "            nulls = df.where( df[cname].isNull() ).count()\n",
    "            result = tuple([cname, nulls])  #new tuple, (column name, null count)\n",
    "            null_counts.append(result)      #put the new tuple in our result list\n",
    "    return null_counts\n",
    "\n",
    "null_counts = count_nulls(df)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('_c0', 0),\n",
       " ('_c3', 0),\n",
       " ('_c4', 1945752),\n",
       " ('_c5', 0),\n",
       " ('_c6', 0),\n",
       " ('_c7', 1),\n",
       " ('_c9', 0),\n",
       " ('_c12', 3510294),\n",
       " ('_c26', 3526153)]"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "null_counts"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "A quick note about Python programming in general, for those who may be new(er) to the language: **one of the core precepts of Python is that most code needs to be _read_ even more often than it needs to be _run_.**  For the purpose of clairty I spread the code in that last function out vertically far more than was strictly necessary.  This bit of code would do the exact same thing:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "\"\"\"\n",
    "null_counts = []\n",
    "for col in df.dtypes:\n",
    "    if col[1] != 'string':\n",
    "        null_counts.append(tuple([col[0], df.where(df[col[0]].isNull()).count()])))\n",
    "\"\"\";"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "But despite accomplishing the same thing in 4 lines instead of 8, it could be argued that it violates the rules of Python style by looking like an unreadable jumble.  Much more on this can be found in the official Python PEP8 style guide, located at:\n",
    "\n",
    "https://www.python.org/dev/peps/pep-0008/\n",
    "\n",
    "If you'll be writing much Python code it's definitely worth looking over.  Note, however, that pySpark frequently violates its guidelines."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "# Dropping Null Values\n",
    "\n",
    "There are three things we can do with our `null` values now that we know what's in our dataframe.  We can **ignore them**, we can **drop them**, or we can **replace them**.  Remember, pySpark dataframes are immutable, so we can't actually change the original dataset.  All operations return an entirely new dataframe, though we can tell it to overwrite the existing one with `df = df.some_operation()` which ends up functionaly equivalent."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "df_drops = df.dropna(how='all', subset=['_c4', '_c12', '_c26'])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "1580403"
      ]
     },
     "execution_count": 9,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df_drops.count()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The `df.dropna()` method has two arguments here:  `how` can equal `'any'` or `'all'`; the first drops a row if _any_ value in it is `null`, the second drops a row only if _all_ values are.  \n",
    "\n",
    "The `subset` argument takes a list of columns that you want to look in for `null` values.  It does not actually subset the dataframe; it just checks in those three columns, then drops the row for the entire dataframe if that subset meets the criteria.  This can be left off if it should check all columns for `null`.\n",
    "\n",
    "So we can see above that once we drop all rows that have `null` values in columns `_c4`, `_c12` and `_c26`, we're left with 1,580,403 rows out of the original 3,526,154 we saw when we called `count` on the whole dataframe."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Note: There is a third argument that `dropna()` can take; the `thresh` argument sets a threshold for the number of `null` entries in a row before it drops it.  It is set to an integer that **specifies how many non-null arguments the row must have; if it has less than that figure it drops the row.**  If you specify this argument as we do below, it returns a dataframe where any row with less than 2 non-null values in the specified subset are dropped:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "df_drops2 = df.dropna(thresh=2, subset=['_c4', '_c12', '_c26'])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "15860"
      ]
     },
     "execution_count": 11,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df_drops2.count()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This leaves us with a lot less columns than the `how='all'` version, which is what we would expect.  In the first a row must have *all three columns* as `null` to be dropped; in the second only *any one of the three* must be null to be dropped.\n",
    "\n",
    "# Replacing Null Values"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "df_fill = df.fillna(0, subset=['_c12'])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The above line goes through all of column `_c12` and replaces `null` values with the value we specified, in this case a zero.  To verify we re-run the command on our new dataframe to count nulls that we used above:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "0"
      ]
     },
     "execution_count": 13,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df_fill.where( df_fill['_c12'].isNull() ).count()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We see it replaced all 3,510,294 nulls we found earlier.  The first term in `fillna()` can be most any type and any value, and the subset list can be left off if the fill should be applied to all columns (though be sure the dtype is consistent with what is already in that column).  Note that `df.replace(a, b)` does this same thing, only you specify `a` as the value to be replaced and `b` as the replacement.  It also accepts the optional subset list, but does not take advantage of optimized null handling."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "# Imputation\n",
    "\n",
    "There are many methods for imputing missing data based upon the values around those missing.  This includes, for example, moving average windows and fitting local linear models.  In pySpark, most of these methods will be handled by _window functions_, which you can read more about here:\n",
    "\n",
    "    https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html\n",
    "    \n",
    "These methods go beyond what we'll cover in this tutorial, though they may be covered in a future tutorial."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 2",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.12"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
